package com.atguigu.day10;

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Session;
import org.apache.flink.table.api.Slide;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.*;

public class Flink06_TableAPI_GroupWindow {
    public static void main(String[] args) {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.获取表的执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        SingleOutputStreamOperator<WaterSensor> waterSensorStream = env
                .fromElements(new WaterSensor("sensor_1", 1000L, 10),
                        new WaterSensor("sensor_1", 2000L, 20),
                        new WaterSensor("sensor_2", 3000L, 30),
                        new WaterSensor("sensor_1", 4000L, 40),
                        new WaterSensor("sensor_1", 5000L, 50),
                        new WaterSensor("sensor_2", 6000L, 60))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );
//        SingleOutputStreamOperator<WaterSensor> waterSensorStream = env.socketTextStream("localhost", 9999)
//                .map(new MapFunction<String, WaterSensor>() {
//                    @Override
//                    public WaterSensor map(String value) throws Exception {
//                        String[] split = value.split(",");
//                        return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
//                    }
//                })
//                .assignTimestampsAndWatermarks(
//                        WatermarkStrategy
//                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
//                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
//                );


        //3.将流转为表
        Table table = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));
//        Table table = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"),$("pt").proctime());

        //TODO 4.使用groupwindow
        //基于时间的滚动窗口
        table
//                .window(Tumble.over(lit(3).seconds()).on($("ts")).as("w"))
                //基于事件时间的滑动窗口
                .window(Slide.over(lit(3).seconds()).every(lit(2).second()).on($("ts")).as("w"))
                //基于事件时间的会话窗口
//                .window(Session.withGap(lit(2).seconds()).on($("ts")).as("w"))
                //基于处理时间的滚动窗口 注意！！！在有界数据中，开启处理时间的窗口其实是没有意义的，通常是在无界数据中开启
//                .window(Tumble.over(lit(3).seconds()).on($("pt")).as("w"))
                //基于元素个数的窗口 注意！！！一定要指定时间字段，并且必须是处理时间
//                .window(Tumble.over(rowInterval(2L)).on($("pt")).as("w"))
//                .window(Tumble.over(rowInterval(2L)).on($("ts")).as("w"))
                .groupBy($("id"), $("w"))
                .select($("id"), $("vc").sum().as("vcSum"),$("w").start(),$("w").end())
//                .select($("id"), $("vc").sum().as("vcSum"))
                .execute()
                .print();

    }
}
